{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create entry points to spark\n",
    "try:\n",
    "    sc.stop()\n",
    "except:\n",
    "    pass\n",
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "sc=SparkContext()\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Map functions\n",
    "\n",
    "These functions are probably the most commonly used functions when dealing with an RDD object.\n",
    "\n",
    "* `map()`\n",
    "* `mapValues()`\n",
    "* `flatMap()`\n",
    "* `flatMapValues()`\n",
    "\n",
    "### `map`\n",
    "\n",
    "The `map()` method applies a function to each elements of the RDD. Each element has to be a valid input to the function. The returned RDD has the function outputs as its new elements.\n",
    "\n",
    "Elements in the RDD object `map_exp_rdd` below are rows of the [mtcars](data/mtcars.csv) in string format. We are going to apply the `map()` function multiple times to convert each string elements as a list elements. Each list element has two values: the first value will be the auto model in string format; the second value will be a list of numeric values."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',\n",
       " 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',\n",
       " 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',\n",
       " 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1']"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# create an example RDD\n",
    "map_exp_rdd = sc.textFile('../../data/mtcars.csv')\n",
    "map_exp_rdd.take(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('',\n",
       "  ['mpg',\n",
       "   'cyl',\n",
       "   'disp',\n",
       "   'hp',\n",
       "   'drat',\n",
       "   'wt',\n",
       "   'qsec',\n",
       "   'vs',\n",
       "   'am',\n",
       "   'gear',\n",
       "   'carb']),\n",
       " ('Mazda RX4',\n",
       "  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),\n",
       " ('Mazda RX4 Wag',\n",
       "  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),\n",
       " ('Datsun 710',\n",
       "  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1'])]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# split auto model from other feature values\n",
    "map_exp_rdd_1 = map_exp_rdd.map(lambda x: x.split(',')).map(lambda x: (x[0], x[1:]))\n",
    "map_exp_rdd_1.take(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Mazda RX4',\n",
       "  ['21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']),\n",
       " ('Mazda RX4 Wag',\n",
       "  ['21', '6', '160', '110', '3.9', '2.875', '17.02', '0', '1', '4', '4']),\n",
       " ('Datsun 710',\n",
       "  ['22.8', '4', '108', '93', '3.85', '2.32', '18.61', '1', '1', '4', '1']),\n",
       " ('Hornet 4 Drive',\n",
       "  ['21.4', '6', '258', '110', '3.08', '3.215', '19.44', '1', '0', '3', '1'])]"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# remove the header row\n",
    "header = map_exp_rdd_1.first()\n",
    "# the filter method apply a function to each elemnts. The function output is a boolean value (TRUE or FALSE)\n",
    "# elements that have output TRUE will be kept.\n",
    "map_exp_rdd_2 = map_exp_rdd_1.filter(lambda x: x != header)\n",
    "map_exp_rdd_2.take(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Mazda RX4',\n",
       "  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),\n",
       " ('Mazda RX4 Wag',\n",
       "  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),\n",
       " ('Datsun 710',\n",
       "  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),\n",
       " ('Hornet 4 Drive',\n",
       "  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# convert string values to numeric values\n",
    "map_exp_rdd_3 = map_exp_rdd_2.map(lambda x: (x[0], list(map(float, x[1]))))\n",
    "map_exp_rdd_3.take(4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### `mapValues`\n",
    "\n",
    "The `mapValues` function requires that each element in the RDD has a **key/value** pair structure, for example, a tuple of 2 items, or a list of 2 items. The `mapValues` function applies a function to each of the element values. The element key will remain unchanged.\n",
    "\n",
    "We can apply the `mapValues` function to the RDD object `mapValues_exp_rdd` below.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Mazda RX4',\n",
       "  [21.0, 6.0, 160.0, 110.0, 3.9, 2.62, 16.46, 0.0, 1.0, 4.0, 4.0]),\n",
       " ('Mazda RX4 Wag',\n",
       "  [21.0, 6.0, 160.0, 110.0, 3.9, 2.875, 17.02, 0.0, 1.0, 4.0, 4.0]),\n",
       " ('Datsun 710',\n",
       "  [22.8, 4.0, 108.0, 93.0, 3.85, 2.32, 18.61, 1.0, 1.0, 4.0, 1.0]),\n",
       " ('Hornet 4 Drive',\n",
       "  [21.4, 6.0, 258.0, 110.0, 3.08, 3.215, 19.44, 1.0, 0.0, 3.0, 1.0])]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mapValues_exp_rdd = map_exp_rdd_3\n",
    "mapValues_exp_rdd.take(4)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Mazda RX4', 29.90727272727273),\n",
       " ('Mazda RX4 Wag', 29.98136363636364),\n",
       " ('Datsun 710', 23.59818181818182),\n",
       " ('Hornet 4 Drive', 38.73954545454546)]"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import numpy as np\n",
    "mapValues_exp_rdd_1 = mapValues_exp_rdd.mapValues(lambda x: np.mean(x))\n",
    "mapValues_exp_rdd_1.take(4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When using mapValues(), the x in the above lambda function refers to the element value, not including the element key."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### `flatMap`\n",
    "\n",
    "This function first applies a function to each elements of an RDD and then flatten the results. We can simply use this function to flatten elements of an RDD without extra operation on each elements.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "x = [('a', 'b', 'c'), ('a', 'a'), ('c', 'c', 'c', 'd')]\n",
    "flatMap_exp_rdd = sc.parallelize(x)\n",
    "flatMap_exp_rdd.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['a', 'b', 'c', 'a', 'a', 'c', 'c', 'c', 'd']"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "flatMap_exp_rdd_1 = flatMap_exp_rdd.flatMap(lambda x: x)\n",
    "flatMap_exp_rdd_1.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### `flatMapValues`\n",
    "\n",
    "The `flatMapValues` function requires that each element in the RDD has a **key/value** pair structure. It applies a function to each **element value** of the RDD object and then flatten the results.\n",
    "\n",
    "For example, my raw data looks like below. But I would like to transform the data so that it has three columns: the first column is the **sample id**; the second the column is the three **types (A,B or C)**; the third column is the **values**.\n",
    "\n",
    "| sample id |  A |  B |  C |\n",
    "|:---------:|:--:|:--:|:--:|\n",
    "|     1     | 23 | 18 | 32 |\n",
    "|     2     | 18 | 29 | 31 |\n",
    "|     3     | 34 | 21 | 18 |"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[[1, (23, 28, 32)], [2, (18, 29, 31)], [3, (34, 21, 18)]]"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# example data\n",
    "my_data = [\n",
    "    [1, (23, 28, 32)],\n",
    "    [2, (18, 29, 31)],\n",
    "    [3, (34, 21, 18)]\n",
    "]\n",
    "flatMapValues_exp_rdd = sc.parallelize(my_data)\n",
    "flatMapValues_exp_rdd.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[(1, ('A', 23)),\n",
       " (1, ('B', 28)),\n",
       " (1, ('C', 32)),\n",
       " (2, ('A', 18)),\n",
       " (2, ('B', 29)),\n",
       " (2, ('C', 31)),\n",
       " (3, ('A', 34)),\n",
       " (3, ('B', 21)),\n",
       " (3, ('C', 18))]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# merge A,B,and C columns into on column and add the type column\n",
    "flatMapValues_exp_rdd_1 = flatMapValues_exp_rdd.flatMapValues(lambda x: list(zip(list('ABC'), x)))\n",
    "flatMapValues_exp_rdd_1.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[[1, 'A', 23],\n",
       " [1, 'B', 28],\n",
       " [1, 'C', 32],\n",
       " [2, 'A', 18],\n",
       " [2, 'B', 29],\n",
       " [2, 'C', 31],\n",
       " [3, 'A', 34],\n",
       " [3, 'B', 21],\n",
       " [3, 'C', 18]]"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# unpack the element values\n",
    "flatMapValues_exp_rdd_2 = flatMapValues_exp_rdd_1.map(lambda x: [x[0]] + list(x[1]) )\n",
    "flatMapValues_exp_rdd_2.collect()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
